package que.app.dwd.db;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import que.app.SqlApp;

/**
 * @author Naruto
 * @description: 一条数据为一人的一次考试的答卷信息
 * @Class_Name que.app.dwd.db.DwsExamInfo
 * @Date 2022/09/02 23:10
 */
public class DwsExamInfo extends SqlApp {
    public static void main(String[] args) {
        new DwsExamInfo().init(
                2,
                9006,
        "DwsExamInfo");
    }
    @Override
    protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {
        readOdsDb(tEnv,"DwsExamInfo");

        //一张答卷
        Table testExam = tEnv.sqlQuery(
                "select " +
                        "data['id'] id, " +
                        "data['user_id'] user_id, " +
                        "data['paper_id'] paper_id, " +
                        "data['score'] score, " +
                        "data['duration_sec'] duration_sec, " +
                        "ts, " +
                        "pt " +
                        "from ods_db " +
                        "where `database` = 'edu' " +
                        "and `table` = 'test_exam' " +
                        "and ( " +
                        "   `type` = 'insert'"
        );

        tEnv.createTemporaryView("test_exam",testExam);
    }



}
